Skip to content

Feat/healthy checks#79

Merged
sangalo20 merged 11 commits into
mainfrom
feat/healthy-checks
May 19, 2026
Merged

Feat/healthy checks#79
sangalo20 merged 11 commits into
mainfrom
feat/healthy-checks

Conversation

@sangalo20
Copy link
Copy Markdown
Contributor

@sangalo20 sangalo20 commented May 19, 2026

Pull Request

Description

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Refactor

Changes Made

How to Test

Migration / Breaking Changes

  • Database migration required — include the migration file path
  • No migration required

Checklist

  • Code follows the Go styleguide (gofmt applied)
  • Commit messages use present tense, imperative mood, ≤72 chars
  • Documentation updated where applicable
  • No secrets or credentials committed

sangalo20 added 4 commits May 18, 2026 17:42
- Cross-reference provider health before expiring connections
- Add bounded concurrency (semaphore) to both health workers
- Add graceful shutdown signal handling for --worker-only mode
- Add DB index on connections(status, last_health_check_at)
- Add GET /connections?workspace_id= endpoint for frontend
- Surface health_status on GET /connections/{id}/token response
- Fix null JSON response, inconsistent logging, trailing newline
Copilot AI review requested due to automatic review settings May 19, 2026 05:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds provider- and connection-level health tracking to nexus-broker, including persisted health fields, background workers to update them, and new HTTP endpoints to surface health state (plus version bump + docs).

Changes:

  • Add DB schema for provider/connection health (health_status, last_health_check_at, etc.) plus an index to support efficient polling.
  • Introduce background workers: provider.HealthWorker and service.ConnectionHealthWorker (bounded concurrency), and wire them up in cmd/nexus-broker.
  • Add API surface: GET /providers/health, GET /connections?workspace_id=..., and include health_status in token responses.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
VERSION Bumps version to 0.2.4.
CHANGELOG.md Adds 0.2.4 release notes for health checks + worker mode.
docs/healthchecks.md Documents the health check architecture and phases.
nexus-broker/pkg/storage/pg.go Extends storage models with provider/connection health fields.
nexus-broker/pkg/provider/store.go Adds provider health fields to profile reads + adds GetAllProfiles and UpdateHealthStatus.
nexus-broker/pkg/provider/store_test.go Updates store tests for added provider health columns.
nexus-broker/pkg/provider/interfaces.go Extends ProfileStorer with GetAllProfiles().
nexus-broker/pkg/provider/health.go Adds HealthWorker to probe providers and persist health status.
nexus-broker/pkg/provider/health_test.go Adds unit tests for provider health probing logic.
nexus-broker/pkg/handlers/providers.go Adds GET /providers/health endpoint.
nexus-broker/pkg/handlers/providers_test.go Updates mock store with GetAllProfiles.
nexus-broker/pkg/handlers/connections.go Adds GET /connections?workspace_id=... endpoint.
nexus-broker/pkg/handlers/consent_test.go Updates mock ConnectionService to include ListConnections.
nexus-broker/pkg/handlers/soc2_compliance_test.go Updates mocked query columns to include health_status.
nexus-broker/migrations/13_add_provider_health.sql Adds provider profile health columns.
nexus-broker/migrations/14_add_connection_health.sql Adds connection health columns.
nexus-broker/migrations/15_add_connection_health_index.sql Adds partial/composite index for health polling query.
nexus-broker/internal/domain/models.go Adds health fields to domain.Connection + introduces ConnectionSummary.
nexus-broker/internal/repository/interfaces.go Extends ConnectionRepository with health polling/update + workspace listing.
nexus-broker/internal/repository/postgres/connection.go Implements health polling, health updates, and workspace connection summaries.
nexus-broker/internal/repository/instrumented/instrumented.go Instruments new repository methods.
nexus-broker/internal/service/connection.go Adds health_status to token response + adds ListConnections.
nexus-broker/internal/service/connection_test.go Updates mocks for new repository/store interface methods.
nexus-broker/internal/service/connection_health.go Adds ConnectionHealthWorker that checks active connections and updates health/status.
nexus-broker/internal/service/connection_health_test.go Adds unit tests for connection health worker behavior.
nexus-broker/cmd/nexus-broker/main.go Wires new endpoints + starts both workers; adds --worker-only SIGTERM/SIGINT handling.
Comments suppressed due to low confidence (1)

CHANGELOG.md:26

  • This changelog entry references refactoring connection_part2.go into credential.go, but connection_part2.go doesn’t exist in the repo. Please correct/remove this line so the release notes accurately reflect the codebase history.
### Changed
- **Service Layer**: Refactored `connection_part2.go` into `credential.go`, separating credential capture, token refresh, and credential validation by responsibility.
- **HTTP Client**: `validateCredentials`, `refreshTokens`, and `executeExchange` now use the centrally injected `httpClient` instead of creating inline clients, ensuring the configured transport is respected across all outbound calls.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +134 to +142
// For OAuth2, attempt a token refresh. The service layer already has this logic.
// If it succeeds, the refresh token is valid (healthy).
// If it fails with invalid_grant, it's expired.
_, err := w.connSvc.Refresh(ctx, c.ID)
if err != nil {
// The caller (runChecks) will cross-reference provider health
// before committing an "expired" status to the database.
return "expired"
}
Comment on lines +163 to +175
// This is a simplified application of the strategy. A full implementation would
// use the bridge's `auth.ApplyAuthentication` but we are inside the broker here.
// For API Key / Bearer, it's usually just a header.
if c.AuthType == "api_key" || c.AuthType == "basic_auth" {
// Assuming the token service returned it as a flat map
for _, v := range credentials {
if strVal, ok := v.(string); ok {
// Very naive injection just for the health check.
// In reality, this requires interpreting the provider's strategy config.
req.Header.Set("Authorization", "Bearer "+strVal)
req.Header.Set("X-API-Key", strVal)
}
}
Comment on lines +178 to +179
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
} else {
log.Printf("ConnectionHealthWorker: Connection %s for provider %s is %s", c.ID, c.ProviderName, status)
// Note: In a full implementation we should also write to the audit log here
_ = w.connRepo.UpdateStatus(checkCtx, c.ID, "expired")
Comment on lines +132 to +152
for dbRows.Next() {
var conn domain.ConnectionWithProvider
err := dbRows.Scan(
&conn.ID, &conn.WorkspaceID, &conn.ProviderID, pq.Array(&conn.Scopes), &conn.ReturnURL, &conn.Status, &conn.ExpiresAt,
&conn.LastHealthCheckAt, &conn.HealthStatus,
&conn.ProviderName, &conn.AuthType, &conn.AuthHeader, &conn.APIBaseURL, &conn.UserInfoEndpoint, &conn.ProviderParams,
)
if err != nil {
return nil, err
}
rows = append(rows, conn)
}

// Returning pointers as per interface
var ptrRows []*domain.ConnectionWithProvider
for i := range rows {
ptrRows = append(ptrRows, &rows[i])
}

return ptrRows, nil
}
Comment thread docs/healthchecks.md Outdated
Comment on lines +80 to +82
1. **Connection Verifier Worker:** Built a new `ConnectionHealthWorker` that uses jittered polling to iterate through active `connections`.
2. **Credential Validation:** For `api_key` or `basic_auth` connections, it periodically decrypts the credential and makes a lightweight, read-only request (e.g., `GET /v1/users/me`). For `oauth2`, it attempts a background token refresh.
3. **State Management:** If the request returns `401 Unauthorized` (or `invalid_grant`), it automatically flips the connection status to `expired`.
Comment on lines 118 to 134
@@ -116,6 +128,7 @@ func main() {
r.Delete("/{id}", providersHandler.Delete)
})
protected.Post("/auth/consent-spec", consentHandler.GetSpec)
protected.Get("/connections", connectionsHandler.List)
protected.Get("/connections/resolve", callbackHandler.ResolveToken)
protected.Get("/connections/{connectionID}/token", callbackHandler.GetToken)
protected.Post("/connections/{connectionID}/refresh", callbackHandler.Refresh)
Comment on lines +20 to +37
// List handles GET /connections?workspace_id=ws-123
// Returns all non-pending connections for a workspace with health status.
func (h *ConnectionsHandler) List(w http.ResponseWriter, r *http.Request) {
workspaceID := r.URL.Query().Get("workspace_id")

if workspaceID == "" {
httputil.WriteError(w, http.StatusBadRequest, "missing_workspace_id", "workspace_id query parameter is required")
return
}

connections, err := h.svc.ListConnections(r.Context(), workspaceID)
if err != nil {
writeServiceError(w, err)
return
}

httputil.WriteJSON(w, http.StatusOK, connections)
}
Comment on lines +206 to +226
// Health handles GET /providers/health to list provider health statuses
func (h *ProvidersHandler) Health(w http.ResponseWriter, r *http.Request) {
profiles, err := h.store.GetAllProfiles()
if err != nil {
httputil.WriteError(w, http.StatusInternalServerError, "health_failed", "Failed to list providers health")
return
}

healthData := make([]map[string]interface{}, 0, len(profiles))
for _, p := range profiles {
healthData = append(healthData, map[string]interface{}{
"id": p.ID.String(),
"name": p.Name,
"health_status": p.HealthStatus,
"last_health_check_at": p.LastHealthCheckAt,
"health_message": p.HealthMessage,
})
}

httputil.WriteJSON(w, http.StatusOK, healthData)
}
Comment on lines +377 to +425
// GetAllProfiles retrieves all non-deleted provider profiles in full
func (s *Store) GetAllProfiles() ([]Profile, error) {
query := `
SELECT id, name, client_id, client_secret, auth_url, token_url, issuer,
enable_discovery, scopes, auth_type, COALESCE(auth_header, ''),
COALESCE(api_base_url, ''), COALESCE(user_info_endpoint, ''), params,
COALESCE(description, ''), COALESCE(category, ''), last_health_check_at,
COALESCE(health_status, 'unknown'), health_message
FROM provider_profiles
WHERE deleted_at IS NULL
`

rows, err := s.db.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query all profiles: %w", err)
}
defer rows.Close()

var profiles []Profile
for rows.Next() {
var p Profile
err := rows.Scan(
&p.ID, &p.Name, &p.ClientID, &p.ClientSecret, &p.AuthURL, &p.TokenURL,
&p.Issuer, &p.EnableDiscovery, pq.Array(&p.Scopes), &p.AuthType,
&p.AuthHeader, &p.APIBaseURL, &p.UserInfoEndpoint, &p.Params, &p.Description, &p.Category,
&p.LastHealthCheckAt, &p.HealthStatus, &p.HealthMessage,
)
if err != nil {
return nil, fmt.Errorf("failed to scan provider profile: %w", err)
}
profiles = append(profiles, p)
}

if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating all profiles: %w", err)
}

return profiles, nil
}

// UpdateHealthStatus updates the health fields for a given provider profile
func (s *Store) UpdateHealthStatus(id uuid.UUID, status string, message *string) error {
query := `UPDATE provider_profiles SET health_status = $1, health_message = $2, last_health_check_at = NOW() WHERE id = $3 AND deleted_at IS NULL`
_, err := s.db.Exec(query, status, message, id)
if err != nil {
return fmt.Errorf("failed to update provider health status: %w", err)
}
return nil
}
- Focused on actual implementation: workers, endpoints, status values
- Removed planning/proposal language
- Added API examples with request/response shapes
- Added worker-only mode section and DB schema reference
- Wired into mkdocs Component Deep Dives nav

Closes #80
Copilot AI review requested due to automatic review settings May 19, 2026 05:14
…Checks to Concepts nav, bump version to 0.2.4
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 28 out of 28 changed files in this pull request and generated 14 comments.

Comments suppressed due to low confidence (1)

CHANGELOG.md:35

  • The changelog entry references changes/files that don't exist in this repo (e.g., connection_part2.go) and describes additional refactors/tests that are not part of this PR. This makes the release notes misleading; please limit this section to changes actually introduced in 0.2.4 (or update file names/descriptions to match reality).
### Changed
- **Service Layer**: Refactored `connection_part2.go` into `credential.go`, separating credential capture, token refresh, and credential validation by responsibility.
- **HTTP Client**: `validateCredentials`, `refreshTokens`, and `executeExchange` now use the centrally injected `httpClient` instead of creating inline clients, ensuring the configured transport is respected across all outbound calls.
- **Audit Interface**: `ConnectionService` now accepts the `audit.Logger` interface instead of a concrete `*audit.Service` pointer, enabling proper mocking in unit tests.
- **Method Promotion**: `validateCredentials` and `refreshTokens` promoted from standalone functions to methods on `connectionService` to allow struct field access.

### Added
- **Service Layer Tests**: 7 new unit tests covering the previously untested `SaveCredential`, `Refresh`, and `ExchangeCodeForTokens` methods, including OAuth2 flows validated against `httptest` mock servers.
- **SOC 2 Integration Tests**: Enterprise-grade compliance test suite (`soc_test.go`, `soc_livedb_test.go`) verifying encryption at rest (SOC-CTRL-01), immutable audit trail (SOC-CTRL-02), API key enforcement (SOC-CTRL-03), IP allowlisting (SOC-CTRL-04), and defense-in-depth middleware (SOC-CTRL-05).
- **Architecture Enforcement**: `TestSeparationOfConcerns` statically analyzes import paths via `go/parser` to enforce layer boundaries at CI time.
- **Docker Compose**: Local PostgreSQL and Redis containers for running live integration tests against a real database schema.

Comment on lines +163 to +175
// This is a simplified application of the strategy. A full implementation would
// use the bridge's `auth.ApplyAuthentication` but we are inside the broker here.
// For API Key / Bearer, it's usually just a header.
if c.AuthType == "api_key" || c.AuthType == "basic_auth" {
// Assuming the token service returned it as a flat map
for _, v := range credentials {
if strVal, ok := v.(string); ok {
// Very naive injection just for the health check.
// In reality, this requires interpreting the provider's strategy config.
req.Header.Set("Authorization", "Bearer "+strVal)
req.Header.Set("X-API-Key", strVal)
}
}
Comment on lines +101 to +109
} else {
log.Printf("ConnectionHealthWorker: Connection %s for provider %s is %s", c.ID, c.ProviderName, status)
// Note: In a full implementation we should also write to the audit log here
_ = w.connRepo.UpdateStatus(checkCtx, c.ID, "expired")
}
}

if err := w.connRepo.UpdateHealthStatus(checkCtx, c.ID, status); err != nil {
log.Printf("ConnectionHealthWorker: failed to update health status for conn %s: %v", c.ID, err)
// ProviderHealthLookup provides read-only access to provider health status.
// This avoids importing the full Store and keeps the dependency narrow.
type ProviderHealthLookup interface {
GetProfile(id uuid.UUID) (*provider.Profile, error)
Comment on lines +132 to +152
for dbRows.Next() {
var conn domain.ConnectionWithProvider
err := dbRows.Scan(
&conn.ID, &conn.WorkspaceID, &conn.ProviderID, pq.Array(&conn.Scopes), &conn.ReturnURL, &conn.Status, &conn.ExpiresAt,
&conn.LastHealthCheckAt, &conn.HealthStatus,
&conn.ProviderName, &conn.AuthType, &conn.AuthHeader, &conn.APIBaseURL, &conn.UserInfoEndpoint, &conn.ProviderParams,
)
if err != nil {
return nil, err
}
rows = append(rows, conn)
}

// Returning pointers as per interface
var ptrRows []*domain.ConnectionWithProvider
for i := range rows {
ptrRows = append(ptrRows, &rows[i])
}

return ptrRows, nil
}
Comment on lines +154 to +164
if isWorkerOnly {
log.Printf("Starting Nexus Broker in WORKER-ONLY mode")
log.Printf("Version: %s", Version)

// Wait for OS signal for graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
log.Printf("Received signal %v, shutting down workers...", sig)
cleanupCancel()
} else {
Comment thread docs/healthchecks.md Outdated
nexus-broker --worker-only
```

In this mode, the HTTP server does not start. The process listens for `SIGINT`/`SIGTERM` and performs a graceful shutdown, draining any in-flight checks before exiting.
Comment on lines +377 to +425
// GetAllProfiles retrieves all non-deleted provider profiles in full
func (s *Store) GetAllProfiles() ([]Profile, error) {
query := `
SELECT id, name, client_id, client_secret, auth_url, token_url, issuer,
enable_discovery, scopes, auth_type, COALESCE(auth_header, ''),
COALESCE(api_base_url, ''), COALESCE(user_info_endpoint, ''), params,
COALESCE(description, ''), COALESCE(category, ''), last_health_check_at,
COALESCE(health_status, 'unknown'), health_message
FROM provider_profiles
WHERE deleted_at IS NULL
`

rows, err := s.db.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query all profiles: %w", err)
}
defer rows.Close()

var profiles []Profile
for rows.Next() {
var p Profile
err := rows.Scan(
&p.ID, &p.Name, &p.ClientID, &p.ClientSecret, &p.AuthURL, &p.TokenURL,
&p.Issuer, &p.EnableDiscovery, pq.Array(&p.Scopes), &p.AuthType,
&p.AuthHeader, &p.APIBaseURL, &p.UserInfoEndpoint, &p.Params, &p.Description, &p.Category,
&p.LastHealthCheckAt, &p.HealthStatus, &p.HealthMessage,
)
if err != nil {
return nil, fmt.Errorf("failed to scan provider profile: %w", err)
}
profiles = append(profiles, p)
}

if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating all profiles: %w", err)
}

return profiles, nil
}

// UpdateHealthStatus updates the health fields for a given provider profile
func (s *Store) UpdateHealthStatus(id uuid.UUID, status string, message *string) error {
query := `UPDATE provider_profiles SET health_status = $1, health_message = $2, last_health_check_at = NOW() WHERE id = $3 AND deleted_at IS NULL`
_, err := s.db.Exec(query, status, message, id)
if err != nil {
return fmt.Errorf("failed to update provider health status: %w", err)
}
return nil
}
Comment on lines +206 to +226
// Health handles GET /providers/health to list provider health statuses
func (h *ProvidersHandler) Health(w http.ResponseWriter, r *http.Request) {
profiles, err := h.store.GetAllProfiles()
if err != nil {
httputil.WriteError(w, http.StatusInternalServerError, "health_failed", "Failed to list providers health")
return
}

healthData := make([]map[string]interface{}, 0, len(profiles))
for _, p := range profiles {
healthData = append(healthData, map[string]interface{}{
"id": p.ID.String(),
"name": p.Name,
"health_status": p.HealthStatus,
"last_health_check_at": p.LastHealthCheckAt,
"health_message": p.HealthMessage,
})
}

httputil.WriteJSON(w, http.StatusOK, healthData)
}
Comment on lines +20 to +37
// List handles GET /connections?workspace_id=ws-123
// Returns all non-pending connections for a workspace with health status.
func (h *ConnectionsHandler) List(w http.ResponseWriter, r *http.Request) {
workspaceID := r.URL.Query().Get("workspace_id")

if workspaceID == "" {
httputil.WriteError(w, http.StatusBadRequest, "missing_workspace_id", "workspace_id query parameter is required")
return
}

connections, err := h.svc.ListConnections(r.Context(), workspaceID)
if err != nil {
writeServiceError(w, err)
return
}

httputil.WriteJSON(w, http.StatusOK, connections)
}
Comment on lines +135 to +145
worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond) // Give it time to run at least once
cancel()

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
}
sangalo20 added 4 commits May 19, 2026 08:30
1. Granular OAuth2 health classification via RefreshResponse.StatusCode:
   - 400/401 → expired (definitive credential death)
   - 403     → degraded (scope issue)
   - 5xx     → unhealthy (transient upstream)
   - nil     → degraded (network/internal — can't determine)

2. Safe credential injection for API key/basic auth:
   - Extract explicit keys (api_key, username/password) instead of
     ranging over entire map which could inject unrelated fields
   - Respect provider's configured AuthHeader
   - Match validateCredentials pattern from credential.go

3. Handle UpdateStatus errors: if expiring fails, log and skip the
   health_status write to avoid inconsistent state

4. Reuse shared http.Client on worker struct for connection pooling

5. GetToken failures return degraded instead of expired

Added 3 new tests: Upstream5xx, 403 scope, NetworkError.
Code:
- Granular OAuth2 health via RefreshResponse.StatusCode (400/401→expired,
  403→degraded, 5xx→unhealthy, nil→degraded)
- Safe credential injection: extract explicit keys (api_key, username/
  password) instead of ranging over map
- Handle UpdateStatus errors: skip health write on failure
- Shared http.Client for connection pooling
- 3 new tests: Upstream5xx, 403 scope, NetworkError

Docs (healthchecks.md):
- Fixed 'jittered polling' → 'fixed ticker'
- Replaced simplified outcome table with granular status code matrix
- Added error handling and shared client notes
- Updated health_status value descriptions
Replace GetProfile (loads full Profile with client_secret, params, etc.)
with GetHealthStatus (SELECT only health_status) for the background
ConnectionHealthWorker's provider cross-reference.

Changes:
- Add Store.GetHealthStatus(uuid) (string, error) — single-column query
- Narrow ProviderHealthLookup interface to GetHealthStatus
- Update isProviderDown to use string status directly
- Remove provider package import from connection_health.go
- Add sqlmock tests for GetHealthStatus (success, not found)
- Update all mock expectations in connection_health_test.go
Store:
- Add ProviderHealthSummary struct and GetAllHealthStatuses() — narrow
  5-column query for /providers/health endpoint
- Add GetHealthStatus() to ProfileStorer interface
- Add rows.Err() check in GetForHealthCheck

Handler:
- Replace GetAllProfiles + manual map-building with GetAllHealthStatuses
- Fix error message grammar

Tests:
- Add GetAllHealthStatuses store tests (success, empty)
- Update Health handler tests to use new method
- Add GetAllHealthStatuses to MockProfileStorer

Docs:
- Fix duplicate section numbering (5→6) in broker.md
- Fix overstated graceful shutdown claim in healthchecks.md
@sangalo20 sangalo20 merged commit 1a90419 into main May 19, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants